Dify Plugin SDK 源码解析

Dify Plugin SDK 是一个完整的插件框架系统,用于开发和运行 Dify 平台的插件。支持多种插件类型(工具、模型、端点、代理、数据源)和三种部署模式(Local、Remote、Serverless)。

交互架构

核心文件位置:

  • 主入口:dify_plugin/plugin.py
  • 注册管理:dify_plugin/core/plugin_registration.py
  • 执行器:dify_plugin/core/plugin_executor.py
  • IO服务器:dify_plugin/core/server/io_server.py
  • 路由器:dify_plugin/core/server/router.py

核心类继承关系

17628436630721762843662409.png

启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
[开始] 用户创建 Plugin 实例

├─→ [步骤1] Plugin.__init__(config: DifyPluginEnv)
│ │
│ ├─→ [步骤1.1] 创建 PluginRegistration 实例
│ │ │
│ │ ├─→ [步骤1.1.1] _load_plugin_configuration()
│ │ │ └─→ 读取 manifest.yaml
│ │ │ └─→ 读取各组件配置文件 (*.yaml)
│ │ │
│ │ ├─→ [步骤1.1.2] _resolve_plugin_cls()
│ │ │ ├─→ _resolve_tool_providers()
│ │ │ │ └─→ 动态导入工具类
│ │ │ ├─→ _resolve_model_providers()
│ │ │ │ └─→ 动态导入模型类
│ │ │ ├─→ _resolve_endpoints()
│ │ │ │ └─→ 动态导入端点类
│ │ │ ├─→ _resolve_agent_providers()
│ │ │ │ └─→ 动态导入代理类
│ │ │ └─→ _resolve_datasource_providers()
│ │ │ └─→ 动态导入数据源类
│ │ │
│ │ └─→ [步骤1.1.3] _load_plugin_assets()
│ │ └─→ 加载 _assets 目录下的资源文件
│ │
│ ├─→ [步骤1.2] 根据安装方式创建通信流
│ │ ├─→ InstallMethod.LOCAL → _launch_local_stream()
│ │ │ └─→ 创建 StdioRequestReader 和 StdioResponseWriter
│ │ ├─→ InstallMethod.REMOTE → _launch_remote_stream()
│ │ │ └─→ 创建 TCPReaderWriter
│ │ │ └─→ 发送初始化消息(配置、工具声明、模型声明)
│ │ └─→ InstallMethod.SERVERLESS → _launch_serverless_stream()
│ │ └─→ 创建 ServerlessRequestReader
│ │ └─→ 启动 HTTP 服务器
│ │
│ ├─→ [步骤1.3] 创建 PluginExecutor 实例
│ │ └─→ 传入 config 和 registration
│ │
│ ├─→ [步骤1.4] 初始化 IOServer 和 Router
│ │ ├─→ IOServer.__init__()
│ │ │ └─→ 创建线程池 (ThreadPoolExecutor)
│ │ └─→ Router.__init__()
│ │
│ └─→ [步骤1.5] _register_request_routes()
│ ├─→ 注册工具路由 (invoke_tool)
│ ├─→ 注册模型路由 (invoke_llm, invoke_text_embedding, etc.)
│ ├─→ 注册端点路由 (invoke_endpoint)
│ ├─→ 注册代理路由 (invoke_agent_strategy)
│ └─→ 注册数据源路由 (validate_datasource_credentials, etc.)

└─→ [完成] Plugin 实例初始化完成,准备运行

1. Plugin.init(config)

  • 文件:dify_plugin/plugin.py
  • 作用:初始化插件系统的所有组件
  • 参数:config - 包含安装方式、连接信息等配置

2. PluginRegistration.init(config)

  • 文件:dify_plugin/core/plugin_registration.py
  • 作用:加载插件配置和动态加载所有插件类
  • 关键方法:
    • _load_plugin_configuration() - 加载 YAML 配置
    • _resolve_plugin_cls() - 解析并导入 Python 类
    • _load_plugin_assets() - 加载静态资源

3. 通信流创建

  • _launch_local_stream() - 标准输入输出模式
  • _launch_remote_stream() - TCP 连接模式
  • _launch_serverless_stream() - HTTP 服务器模式

监听流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[开始] Plugin.run() 被调用

└─→ IOServer._run()

├─→ [线程1] _setup_instruction_listener()
│ │
│ └─→ [循环] while True:
│ ├─→ request_reader.read()
│ │ └─→ 阻塞等待请求到达
│ │
│ ├─→ 解析请求数据
│ │
│ └─→ executer.submit(_execute_request_in_thread, data)
│ └─→ 提交到线程池异步执行

├─→ [线程2] request_reader.event_loop()
│ └─→ 请求读取器的事件循环(如果支持)

├─→ [线程3] _heartbeat() (仅 Remote 模式)
│ └─→ [循环] 每隔一段时间发送心跳消息
│ └─→ default_writer.write(heartbeat_message)

└─→ [线程4] _parent_alive_check() (仅 Local 模式)
└─→ [循环] 检查父进程是否存活
├─→ 如果父进程退出 → 终止插件进程
└─→ 否则继续等待

1. Plugin.run()

  • 文件:dify_plugin/plugin.py
  • 作用:启动插件运行,调用 IOServer._run()

2. IOServer._run()

  • 文件:dify_plugin/core/server/io_server.py
  • 作用:启动多个监听线程,处理请求和维护连接

3. _setup_instruction_listener()

  • 作用:主监听循环,接收请求并提交到线程池
  • 流程:
    1. 调用 request_reader.read() 阻塞等待
    2. 收到请求后解析数据
    3. 提交到线程池执行 _execute_request_in_thread()

4. request_reader 的实现

  • StdioRequestReader - 从标准输入读取
  • TCPReaderWriter - 从 TCP 连接读取
  • ServerlessRequestReader - 从 HTTP 请求读取

运行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
[请求到达] request_reader.read() 接收到请求

└─→ [步骤1] _execute_request_in_thread(data)

├─→ [步骤2] 创建 Session 对象
│ └─→ Session(request_id, response_writer)

├─→ [步骤3] Router.dispatch(session, data)
│ │
│ ├─→ [步骤3.1] 解析请求类型
│ │ └─→ 从 data 中提取 request_type
│ │
│ ├─→ [步骤3.2] 查找路由处理函数
│ │ └─→ 从路由表中匹配 request_type
│ │
│ └─→ [步骤3.3] 调用对应的处理函数
│ ├─→ invoke_tool → PluginExecutor.invoke_tool()
│ ├─→ invoke_llm → PluginExecutor.invoke_llm()
│ ├─→ invoke_endpoint → PluginExecutor.invoke_endpoint()
│ └─→ 其他请求类型...

├─→ [步骤4] PluginExecutor 执行具体功能
│ │
│ ├─→ [步骤4.1] 从 registration 获取对应的类
│ │ └─→ registration.get_tool_provider(provider_name)
│ │
│ ├─→ [步骤4.2] 实例化类并调用方法
│ │ └─→ tool_instance._invoke(...)
│ │
│ └─→ [步骤4.3] 处理返回结果
│ ├─→ 普通结果 → 直接返回
│ ├─→ 生成器 → 流式返回
│ └─→ 文件 → 分块传输

└─→ [步骤5] 发送响应

├─→ [情况1] 流式响应
│ └─→ for chunk in result:
│ └─→ session.write(chunk)

├─→ [情况2] 二进制文件
│ └─→ 分块读取并发送
│ └─→ session.write_file(file_data)

└─→ [情况3] 普通响应
└─→ session.write(response)
└─→ session.end()

1. _execute_request_in_thread(data)

  • 文件:dify_plugin/core/server/io_server.py
  • 作用:在线程池中执行请求处理
  • 流程:创建 Session → 路由分发 → 执行 → 响应

2. Router.dispatch(session, data)

  • 文件:dify_plugin/core/server/router.py
  • 作用:根据请求类型分发到对应的处理函数
  • 路由表:self._routes 字典,key 为请求类型,value 为处理函数

3. PluginExecutor 的执行方法

  • invoke_tool() - 执行工具调用
  • invoke_llm() - 执行大语言模型调用
  • invoke_text_embedding() - 执行文本嵌入
  • invoke_rerank() - 执行重排序
  • invoke_tts() - 执行文本转语音
  • invoke_speech2text() - 执行语音转文本
  • invoke_moderation() - 执行内容审核
  • invoke_endpoint() - 执行端点调用
  • invoke_agent_strategy() - 执行代理策略

4. Session 对象

  • 作用:封装单次请求的上下文

  • 方法:

    • write(data) - 发送响应数据
    • write_file(file) - 发送文件数据
    • end() - 结束响应

交互流程

1. Local 模式(标准输入输出)
1
2
3
if InstallMethod.Local == config.INSTALL_METHOD:
# 本地安装方式:使用标准输入输出进行通信
request_reader, response_writer = self._launch_local_stream(config)

_launch_local_stream 中创建标准输入输出的读取器和写入器

1
2
3
4
5
6
reader = StdioRequestReader()  # 从标准输入读取请求数据
writer = StdioResponseWriter() # 向标准输出写入响应数据

# 首先向标准输出写入插件配置信息,告知 Dify plugin daemon 主程序插件的能力和配置
# 配置信息以JSON格式输出,后跟两个换行符作为消息结束标志
writer.write(self.registration.configuration.model_dump_json() + "\n\n")

之后便是从标准输入中读取到数据,然后处理请求逻辑,之后通过标准输出返回结果

数据流向:

1
2
3
4
5
6
7
Dify plugin daemon ──[请求]──> 插件stdin ──> StdioRequestReader.read()


处理请求逻辑


Dify plugin daemon <──[响应]── 插件stdout <── StdioResponseWriter.write()
2. Remote 模式(TCP 连接)
1
2
3
if InstallMethod.Remote == config.INSTALL_METHOD:
# 远程安装方式:使用TCP连接与远程 Dify plugin daemon 实例通信
request_reader, response_writer = self._launch_remote_stream(config)

_launch_remote_stream 逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 解析远程安装的主机地址和端口号
install_host, install_port = self._get_remote_install_host_and_port(config)

# 创建TCP读写器,用于与远程 Dify plugin daemon 实例建立TCP连接
# on_connected回调函数会在连接建立后立即调用,用于发送初始化数据
tcp_stream = TCPReaderWriter(
install_host, # 远程主机地址
install_port, # 远程主机端口
config.REMOTE_INSTALL_KEY, # 远程安装密钥,用于身份验证
on_connected=lambda: self._initialize_tcp_stream(tcp_stream), # 连接建立后的回调函数
)

# 启动TCP连接,开始与远程服务器通信
tcp_stream.launch()

tcp_stream.launch() 用于建立连接,连接成功后,会执行回调函数 _initialize_tcp_stream 向远程 Dify plugin daemon 实例发送插件的完整配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 1. 发送插件清单声明,包含插件的基本信息和元数据
tcp_stream.write(
InitializeMessage(
type=InitializeMessage.Type.MANIFEST_DECLARATION, # 消息类型:清单声明
data=self.registration.configuration.model_dump(), # 插件配置数据
).model_dump_json()
+ "\n\n" # 消息结束标志:双换行符
)

# 2. 如果插件注册了工具,发送工具配置声明
if self.registration.tools_configuration:
tcp_stream.write(
InitializeMessage(
type=InitializeMessage.Type.TOOL_DECLARATION, # 消息类型:工具声明
data=List(root=self.registration.tools_configuration).model_dump(), # 工具配置列表
).model_dump_json()
+ "\n\n"
)
......

数据流向:

1
2
3
4
5
6
7
Dify plugin daemon ──[TCP请求]──> TCPReaderWriter.read()


处理请求逻辑


Dify plugin daemon <──[TCP响应]── TCPReaderWriter.write()
3. Serverless 模式(HTTP 服务器)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[Dify plugin daemon]                    [插件 HTTP 服务]
│ │
├─→ [步骤1] 插件启动 HTTP 服务 │
│ ├─→ 启动 Flask/HTTP 服务器
│ └─→ 监听 HTTP 端口

├─→ [步骤2] 发送 HTTP 请求 │
│ └─→ POST /invoke │
│ ├─→ Headers │
│ └─→ JSON Body │
│ │
│ ├─→ [步骤3] 接收请求
│ │ └─→ ServerlessRequestReader
│ │
│ ├─→ [步骤4] 处理请求
│ │ └─→ 路由分发 → 执行
│ │
└─→ [步骤5] 接收 HTTP 响应 ←─────────┤
├─→ 状态码 200 │
└─→ JSON 响应体 │

数据流向:

1
2
3
4
5
6
7
Dify plugin daemon ──[HTTP POST]──> ServerlessRequestReader.read()


处理请求逻辑


Dify plugin daemon <──[HTTP Response]── 直接返回 HTTP 响应

请求格式示例:

1
2
3
4
5
6
7
8
9
10
11
{
"request_id": "uuid-1234-5678",
"request_type": "invoke_tool",
"data": {
"provider": "google",
"tool_name": "search",
"parameters": {
"query": "Python tutorial"
}
}
}

响应格式示例:

1
2
3
4
5
6
7
{
"request_id": "uuid-1234-5678",
"status": "success",
"data": {
"result": "搜索结果..."
}
}

完整调用链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
用户代码

├─→ Plugin(config)
│ │
│ ├─→ PluginRegistration(config)
│ │ ├─→ _load_plugin_configuration()
│ │ │ ├─→ yaml.safe_load()
│ │ │ └─→ 读取各组件配置
│ │ │
│ │ ├─→ _resolve_plugin_cls()
│ │ │ ├─→ _resolve_tool_providers()
│ │ │ │ └─→ importlib.import_module()
│ │ │ ├─→ _resolve_model_providers()
│ │ │ ├─→ _resolve_endpoints()
│ │ │ ├─→ _resolve_agent_providers()
│ │ │ └─→ _resolve_datasource_providers()
│ │ │
│ │ └─→ _load_plugin_assets()
│ │
│ ├─→ _launch_*_stream()
│ │ ├─→ StdioRequestReader() / TCPReaderWriter() / ServerlessRequestReader()
│ │ └─→ StdioResponseWriter() / TCPReaderWriter()
│ │
│ ├─→ PluginExecutor(config, registration)
│ │
│ ├─→ IOServer.__init__()
│ │ └─→ ThreadPoolExecutor()
│ │
│ ├─→ Router.__init__()
│ │
│ └─→ _register_request_routes()
│ └─→ self.register_route(type, handler)

└─→ plugin.run()

└─→ IOServer._run()

├─→ Thread(_setup_instruction_listener)
│ └─→ while True:
│ ├─→ request_reader.read()
│ └─→ executer.submit(_execute_request_in_thread)
│ │
│ └─→ _execute_request()
│ │
│ ├─→ Session(request_id, writer)
│ │
│ ├─→ Router.dispatch(session, data)
│ │ │
│ │ ├─→ 查找路由表
│ │ │
│ │ └─→ handler(session, data)
│ │ │
│ │ └─→ PluginExecutor.invoke_*()
│ │ │
│ │ ├─→ registration.get_*_provider()
│ │ │
│ │ ├─→ provider_instance._invoke()
│ │ │ └─→ 用户实现的插件逻辑
│ │ │
│ │ └─→ 返回结果
│ │
│ └─→ 发送响应
│ ├─→ session.write(data)
│ │ └─→ response_writer.write()
│ │
│ └─→ session.end()

├─→ Thread(request_reader.event_loop)

├─→ Thread(_heartbeat)

└─→ Thread(_parent_alive_check)

核心类方法调用关系

Plugin 类:

1
2
3
4
5
6
7
8
9
Plugin
├─→ __init__(config)
│ ├─→ _launch_local_stream()
│ ├─→ _launch_remote_stream()
│ ├─→ _launch_serverless_stream()
│ └─→ _register_request_routes()

└─→ run()
└─→ IOServer._run()

PluginRegistration 类:

1
2
3
4
5
6
7
8
9
10
11
PluginRegistration
├─→ __init__(config)
│ ├─→ _load_plugin_configuration()
│ ├─→ _resolve_plugin_cls()
│ └─→ _load_plugin_assets()

├─→ get_tool_provider(name)
├─→ get_model_provider(name)
├─→ get_endpoint(name)
├─→ get_agent_provider(name)
└─→ get_datasource_provider(name)

PluginExecutor 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
PluginExecutor
├─→ __init__(config, registration)

├─→ invoke_tool(session, data)
│ ├─→ registration.get_tool_provider()
│ └─→ tool._invoke()

├─→ invoke_llm(session, data)
│ ├─→ registration.get_model_provider()
│ └─→ model._invoke()

├─→ invoke_endpoint(session, data)
│ ├─→ registration.get_endpoint()
│ └─→ endpoint._invoke()

└─→ 其他 invoke_* 方法...

IOServer 类:

1
2
3
4
5
6
7
8
9
10
11
IOServer
├─→ __init__()
│ └─→ ThreadPoolExecutor()

├─→ _run()
│ ├─→ _setup_instruction_listener()
│ ├─→ _heartbeat()
│ └─→ _parent_alive_check()

└─→ _execute_request(session, data)
└─→ Router.dispatch()

Router 类:

1
2
3
4
5
6
7
8
9
10
11
Router
├─→ __init__()
│ └─→ self._routes = {}

├─→ register_route(type, handler)
│ └─→ self._routes[type] = handler

└─→ dispatch(session, data)
├─→ 解析 request_type
├─→ 查找 self._routes[request_type]
└─→ handler(session, data)

插件接口

继承关系图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
ToolLike[T] (泛型基类)
├── Tool (工具实现)
│ └── [具体工具实现]

└── AgentStrategy (代理策略)
└── [具体代理策略实现]

ToolProvider (工具提供商)
└── AgentProvider (代理提供商)

ModelProvider (模型提供商)
└── [具体模型提供商实现]
↓ (通过 ModelFactory 创建)
AIModel (AI 模型基类)
├── LargeLanguageModel
├── TextEmbeddingModel
├── RerankModel
├── Speech2TextModel
├── TTSModel
└── ModerationModel

DatasourceProvider (数据源提供商)
├── WebsiteDatasource
├── OnlineDocumentDatasource
└── OnlineDriveDatasource

Endpoint (端点基类)
└── [具体端点实现]